Apache AirflowのTriggerDagRunOperatorをバージョン2対応する
Apache AirflowのAWSマネージドサービスであるAmazon MWAAが、少し前にバージョン2対応しました。
その際 TriggerDagRunOperator
の仕様が変更されていて、動作の違いについて調べて対応したのでまとめておきます。
TriggerDagRunOperatorとは
バージョン1の場合
下記の例のように trigger_dag_id
にDAGのIDを指定することで、そのDAGを起動することができます。
その際 python_callable
に関数を指定することで、関数内で payload
として作成した任意の値を dag_run.conf
から参照することが可能です。
この値は、別のタスクで使用すること可能です。
def trigger_task_func(context, object): object.payload = {"message": "Hello world"} return object trigger_task = TriggerDagRunOperator( task_id='trigger_task', trigger_dag_id="target_dag", python_callable=trigger_task_func, )
バージョン2の場合
バージョン1で使用できた python_callable
が使用できなくなっています。
その代わり、conf
というパラメータが指定可能となり、直接任意の値を渡すことが可能です。
trigger_task = TriggerDagRunOperator( task_id='trigger_task', trigger_dag_id="target_dag", conf={"message": "Hello world"}, )
仕様の変更については、こちらのPull Requestに記されています。
It removes the python_callable argument and is thus not backwards compatible so should be merged in Airflow 2.0. Also (I think), people might have "abused" this weird DagRunOrder class to set their dagrun id. This PR removes that possibility.
↓翻訳
これは python_callable 引数を削除するもので、後方互換性がないため、Airflow 2.0 にマージされるべきです。また、(私が思うに)人々はこの奇妙な DagRunOrder クラスを "悪用して" 彼らの dagrun id を設定していたかもしれません。このPRはその可能性を取り除きます。
そのため、バージョン1で python_callable
に関数を渡して何かしらの処理をしていた場合、PythonOperator
などで処理を分けて事前に実行する必要があります。
Pull Request内でも例として紹介されています。
python_callableに指定した関数内で作成した値を他のタスクで使用する
今回の移行対象のプロジェクトでは、python_callable
に指定した関数内で動的に作成した値が、他のタスクで dag_run.conf
を通して使用されていました。
def initialize(context, object): hoge = "渡したい値1" fuga = "渡したい値2" object.payload = { "hoge": hoge, "fuga": fuga } return object trigger_task = TriggerDagRunOperator( task_id='trigger_task', trigger_dag_id="second_dag", python_callable=initialize, )
another_task = PythonOperator( task_id='another_task', op_kwargs={ 'hoge': "{{ dag_run.conf['hoge'] }}", 'fuga': "{{ dag_run.conf['fuga'] }}" } )
バージョン2では python_callable
は使えないため、別の方法で他のタスクに値を渡す必要がありました。
対応方法はいくつかあるかと思いますが、今回は xcom の機能を利用する方法を紹介します。
利用方法は下記の通りです。
def push(**context): hoge = "渡したい値1" fuga = "渡したい値2" return { "hoge": hoge, "fuga": fuga } initialize = PythonOperator( task_id='initialize', python_callable=push, provide_context=True, ) trigger_task = TriggerDagRunOperator( task_id='trigger_task', trigger_dag_id="second_dag", conf={ 'hoge': "{{ ti.xcom_pull(task_ids='initialize')['hoge'] }}", 'fuga': "{{ ti.xcom_pull(task_ids='initialize')['fuga'] }}" } ) initialize >> trigger_task
another_task = PythonOperator( task_id='another_task', op_kwargs={ 'hoge': "{{ dag_run.conf['hoge'] }}", 'fuga': "{{ dag_run.conf['fuga'] }}" } )
バージョン1の python_callable
に渡した関数で作成するような動的な値を、PythonOperator
で別のタスクに分けて作成します。
例では initialize
タスクの push
関数で作成し、戻り値としています。
次の trigger_task
タスクから xcom_pull
を使って取得しています。
以上です!